<?php
require_once(Kohana::find_file('vendor','phprpc/phprpc_server'));

/**
 * Created by PhpStorm.
 * User: cai
 * Date: 16-2-8
 * Time: 上午11:26
 */
class Controller_Triggerserver extends Controller
{
    /**
     * 数据同步服务
     */
    public function Action_syn()
    {
        $server = new PHPRPC_Server();
        $server->add('rpcdata',$this);
        $server->start();
    }

    // 分析保存数据
    public function rpcdata($data)
    {

//        if(isset($_GET['environment'])&&$_GET['environment']){
//            $configenvironment = Kohana::$config->load('triggerenvironment');
//            $environment_str   = $configenvironment->get('environment');
//            if($environment_str==$_GET['environment']){
//                //环境正确
//            }else{
//                return 'environment ['.$_GET['environment'].'->'.$environment_str.'] is wrong!';
//            }
//        }else{
//            return 'environment is empty!';
//        }

        //临时log记录
        //Kohana::$log->add(Log::INFO, 'Rpc get data follow');
        //Kohana::$log->add(Log::INFO, var_export($data,TRUE));

        $arr_insert  = $arr_update = $arr_delete = array();
        $ids         = $ids_delete = $ids_insert = $ids_update = array();
        $row1        = reset($data);
        $tablename   = "wms_".$row1['table'];
        $warehouseid = $row1['warehouseid'];
        unset($row1);
        $lock_file = APPPATH.'cache/rpcdata_'.$tablename.'.lock';
        $fp        = fopen($lock_file,'w');
        if(flock($fp,LOCK_EX)){ // 文件锁
            foreach($data as $key => $row){
                if($row['act']=='add'){
                    $ids_insert[] = $key;
                    if(empty($row['data'])){
                        continue;
                    }
                    $arr_insert[$row['tableid']] = $row['data'];
                }elseif($row['act']=='update'){
                    $ids_update[] = $key;
                    if(empty($row['data'])){
                        continue;
                    }
                    $arr_update[$row['tableid']][] = $row;
                }else{
                    $ids_delete[]                = $key;
                    $arr_delete[$row['tableid']] = $row['data'];
                    unset($arr_update[$row['tableid']],$arr_insert[$row['tableid']]);
                }
            }
            $records = array();
            if(sizeof($arr_update)){
                $sql  = "SELECT * FROM $tablename 
                    WHERE id IN (".implode(',',array_map('intval',array_keys($arr_update))).")";
                $rows = DB::query(Database::SELECT,$sql)
                    ->execute('fbaerp')
                    ->as_array();
                if(is_array($rows)&&sizeof($rows)){
                    foreach($rows as $row){
                        $records[$row['id']] = $row;
                    }
                }
                /*
                $arr_new = array_diff_key($arr_update, $records);       // 找出update中不存在的ID记录
                foreach($arr_new as $key => $var) {
                    $arr_insert[$key] = $var[0]['data'];
                    unset($arr_update[$key]);
                }
                */
            }
            // 执行新增操作
            if(sizeof($arr_insert)){
                $sql  = "SELECT id FROM $tablename 
                    WHERE id IN (".implode(',',array_map('intval',array_keys($arr_insert))).")";
                $rows = DB::query(Database::SELECT,$sql)
                    ->execute('fbaerp')
                    ->as_array();
                if(is_array($rows)&&sizeof($rows)){
                    foreach($rows as $row){
                        $arr_insert[$row['id']]['crontrigger'] = 2;
                        DB::update($tablename)
                            ->set($arr_insert[$row['id']])
                            ->where('id','=',$row['id'])
                            ->execute('fbaerp');
                        unset($arr_insert[$row['id']]);
                    }
                }
                if(sizeof($arr_insert)){
                    foreach($arr_insert as $row){
                        $row['crontrigger'] = 1;
                        DB::insert($tablename,array_keys($row))
                            ->values($row)
                            ->execute('fbaerp');
                    }
                }
            }
            $ids = $ids_insert;
            // 执行更新操作
            if(sizeof($arr_update)){
                $conflicts = $update_data = array();
                $editions  = $this->getEdition($tablename,array_keys($arr_update),$warehouseid);
                foreach($arr_update as $tableid => $rows){
                    if(isset($arr_insert[$tableid])){
                        continue;
                    }  // 如果是新数据，则该数据任何的upate操作都可以忽略
                    $data_client = $data_server = array();
                    $update_info = $arr_field_time = array();
                    foreach($rows as $row){
                        if(empty($row['info'])){
                            continue;
                        }   // 没任何字段更新（问题数据）？
                        foreach(array_filter(explode(',',$row['info'])) as $field){
                            $arr_field_time[$field] = $row['addtime'];          // 根据ID升序的原则，后者替换前者
                        }
                    }
                    if(isset($editions[$tableid])){
                        $arr_field_time_local = $editions[$tableid];        // 本库最后被修改字段的时间
                        foreach($arr_field_time as $field => $modify_time){
                            $data_client[$field] = $rows[0]['data'][$field];
                            // 如果某字段数据在本库更新在后，则拒绝更新此字段的值
                            if(isset($arr_field_time_local[$field])&&($arr_field_time_local[$field]>=$modify_time)){
                                continue;
                            }
                            $update_info[$field] = $rows[0]['data'][$field];
                        }
                        $data_server = array_intersect_key($records[$tableid],$data_client);
                        // 版本冲突信息：客户端数据，服务端数据，更新后数据
                        $conflicts[$tableid] = array(
                            'client'  => $data_client,
                            'server'  => $data_server,
                            'updated' => $update_info
                        );
                        if(sizeof($update_info)){
                            $update_info['crontrigger'] = 2;
                            $update_data[$tableid]      = $update_info;
                        }
                    }else{
                        // 如果不存在冲突，则可直接进行数据覆盖
                        $info = $rows[0]['data'];
                        unset($info['id']);
                        $update_data[$tableid] = $info;
                    }
                }
                // 更新数据
                if(sizeof($update_data)){
                    foreach($update_data as $tableid => $info){
                        DB::update($tablename)
                            ->set($info)
                            ->where('id','=',$tableid)
                            ->execute('fbaerp');
                    }
                }
                // 保存冲突数据
                if(sizeof($conflicts)){
                    foreach($conflicts as $tableid => $row){
                        $arr = array(
                            'tablename' => $tablename,
                            'tableid'   => $tableid,
                            'client'    => serialize($row['client']),
                            'server'    => serialize($row['server']),
                            'updated'   => empty($row['updated'])?'':serialize($row['updated'])
                        );
                        DB::insert('triggertable_conflicts',array(
                            'tablename',
                            'tableid',
                            'client',
                            'server',
                            'updated'
                        ))
                            ->values($arr)
                            ->execute('fbaerp');
                    }
                }
            }
            $ids = array_merge($ids,$ids_update);
            // 执行删除操作
            if(sizeof($arr_delete)){
                DB::delete($tablename)
                    ->where('id','in',array_keys($arr_delete))
                    ->execute('fbaerp');
            }
            $ids = array_merge($ids,$ids_delete);
            // usleep(100000);     // 0.1秒
            flock($fp,LOCK_UN);
        }

        return $ids;
    }

    // 获取数据修改历史
    protected function getEdition($table,$tableids,$warehouseid = 0)
    {
        if(!is_array($tableids)||!sizeof($tableids)||!preg_match('/^[\w]+$/',$table)){
            return array();
        }
        $data  = $editions = array();
        $where = "tablename='{$table}' AND tableid IN (".implode(',',array_map('intval',$tableids)).") ";
        if($warehouseid){
            $where .= " AND warehouseid=".intval($warehouseid);
        }
        $sql  = "SELECT * FROM triggertable WHERE {$where}  ORDER BY id ASC";
        $rows = DB::query(Database::SELECT,$sql)
            ->execute('fbaerp')
            ->as_array();
        if(is_array($rows)&&sizeof($rows)){
            foreach($rows as $row){
                $editions[$row['tableid']][] = $row;
            }
        }
        foreach($editions as $tableid => $rows){
            $arr_field_time = array();
            foreach($rows as $row){
                if(empty($row['info'])){
                    continue;
                }
                foreach(array_filter(explode(',',$row['info'])) as $field){
                    $arr_field_time[$field] = $row['addtime'];          // 根据ID升序的原则，后者替换前者
                }
            }
            $data[$tableid] = $arr_field_time;
        }
        return $data;
    }
}